Netty最佳实践(参考Netty In Action) 您所在的位置:网站首页 netty 算法 Netty最佳实践(参考Netty In Action)

Netty最佳实践(参考Netty In Action)

2023-03-16 01:19| 来源: 网络整理| 查看: 265

使用池化buffer /* *use unpooled buffers with caution 谨慎使用非池化的buffer *allocation/deallocation is slow buffer的分配和回收是很缓慢的 * Use Pooling of buffers to reduce allocation / deallocation time! 创建池化buffer会减少分配和回收时间 * */ Bootstrap bootstrap = new Bootstrap(); bootstrap.option(ChannelOption.ALLOCATOR,PooledByteBufAllocator.DEFAULT); 搜索ButeBuf //使用ButeBuf的forEachByte和ByteBufProcessor代替普通循环查找 SlowSearch :( ByteBuf buf = ...; int index = -1; for (int i = buf.readerIndex(); index == -1 && i index = i;} } FastSearch :) ByteBuf buf = ...; int index = buf.forEachByte(new ByteBufProcessor() {@Overridepublic boolean process(byte value) {return value != '\n';} });ByteBufProcessor is faster because it…​ can eliminate range checks 可以消除范围(边界)检查 ​ can be created and shared 可以共享 ​ easier to inline by the JIT 更容易通过JIT内联 Use it whenever you need to find some pattern in a ByteBuf 当您需要在ByteBuf中查找时,可以使用它 其他buffer相关技巧 ​ alloc() over Unpooled ​ slice(), duplicate() over copy ​ bulk operations over loops 使用零拷贝或者内存映射传输文件内容 //大文件 File file=new File(""); FileInputStream in = new FileInputStream(file); //1 FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length()); //2 ctx.channel().writeAndFlush(region).addListener(new ChannelFutureListener() { //3@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (!future.isSuccess()) {Throwable cause = future.cause(); //4// Do something}} });//=====================================分隔线===================================//This only works if you not need to modify the data on the fly. If so use ChunkedWriteHandler and NioChunkedFile. 仅在不修改数据情况下使用,否则使用ChunkedWriteHandler and NioChunkedFile channel.transforTo(new DefaultFileRegion(fc, 0, fileLength)); 不要在EventLoopGroup的线程里面执行如下操作 //睡眠、阻塞、耗时计算、阻塞的db查询(即耗时操作因交给用户线程,不应该是work线程或者称之为child线程) Thread.sleep() CountDownLatch.await() or any other blocking operation from java.util.concurrent Long-lived computationally intensive operations Blocking operations that might take a while (e.g. DB query) 共享EventLoopGroup线程组 //Share EventLoopGroup between different Bootstraps EventLoopGroup group = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap().group(group); Bootstrap bootstrap2 = new Bootstrap().group(group); //可以共享线程资源,继而达到资源利用最大化 类似于代理应共享同一个线程组group

不应该是如下做法:

//Called once a new connection was accepted 接收新连接时候被调用 //Use a new EventLoopGroup instance to handle the connection to the remote peer 使用一个新的线程组来处理代理和远程服务器的连接 //Don’t do this! This will tie up more resources than needed and introduce extra context-switching overhead. 这样很耗费资源,并引入额外的上下文切换开销public class ProxyHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) { final Channel inboundChannel = ctx.channel();Bootstrap b = new Bootstrap();b.group(new NioEventLooopGroup()); ...ChannelFuture f = b.connect(remoteHost, remotePort);...} }

正确做法:

// Called once a new connection was accepted // Share the same EventLoop between both Channels. This means all IO for both connected Channels are handled by the same Thread. 共享线程,意味着该代理和远程连接的服务的io都由同一个线程处理public class ProxyHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) { final Channel inboundChannel = ctx.channel();Bootstrap b = new Bootstrap();b.group(inboundChannel.eventLoop()); ...ChannelFuture f = b.connect(remoteHost, remotePort);...} }

时刻记着在Application中共享EventLoop 以下开发实例:

//实际在转发中使用同一线程组,以下是在某个server端handler的read方法接收到消息,并将消息转发到另外一个服务端@Override public void channelRead(ChannelHandlerContext ctx, Object s) throws Exception {System.out.println("中转站服务端收到消息 :" + s);/** 重点在于client和server使用同一个group,使用ctx.channel().eventLoop()来获取,切记不能调用sync方法阻塞当前线程,只能添加监听。*/Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class).handler(new ChannelInitializer() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new StringDecoder(CharsetUtil.UTF_8)).addLast(new StringEncoder(CharsetUtil.UTF_8)).addLast("client2", new ClientHandler());}}).group(ctx.channel().eventLoop());ChannelFuture connect = bootstrap.connect("127.0.0.1", 6667);connect.addListener(new GenericFutureListener() {@Overridepublic void operationComplete(Future future) throws Exception {System.out.println(connect.channel());connect.channel().writeAndFlush(s);connect.addListener(ChannelFutureListener.CLOSE);}}); } 当在外部调用channel的write方法时

所有write方法,在真正往通道写数据之前,都会检查当前写线程是否是Netty自身EventLoopGroup之中的线程(即Fast,如果是外部的定义为slow),如果是则写,不是则将当前写任务加入到自身的线程队列。所以,所有的channel相关的io方法均是线程安全的,都是由netty自身线程完成的。

Always combine operations if possible when act on the Channel from outside the EventLoop to reduce overhead of wakeups and object creation! 当从EventLoop外部对通道进行操作时,尽可能合并操作,以减少唤醒和对象创建的开销!Not recommended!不推荐的方式channel.write(msg1); channel.writeAndFlush(msg3);Combine for the WIN! 推荐方法,尽量合并写操作channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {channel.write(msg1);channel.writeAndFlush(msg3);} }); 当在内部操作Channel时

Use the shortest path as possible to get the maximal performance.

ctx和channel均有write方法,当使用上下文ctx的时候,数据将会从当前handler流向ChannelPieline的下一个handler进行处理, 而使用channel的时候,则数据是从pipeline的尾端(若为InboundHandler入站处理器)开始一次流向所有handler的。 可以见两种方式流经处理器的数量是不一样的,因此效率也有差异。public class YourHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) {// BAD (most of the times)ctx.channel().writeAndFlush(msg); // GOODctx.writeAndFlush(msg); } } Channel.* methods ⇒ the operation will start at the tail of the ChannelPipeline ChannelHandlerContext.* methods => the operation will start from this `ChannelHandler to flow through the ChannelPipeline. 使用Shareable共享Handler

Share ChannelHandlers if stateless 若果handler无状态,那么可以共享使用同一个实例,还可以减少GC

如果ChannelHandler被注解为 @Sharable,全局只有一个handler实例,它会被多个Channel的Pipeline共享,会被多线程并发调用,因此它不是线程安全的;如果存在跨ChannelHandler的实例级变量共享,需要特别注意,它可能不是线程安全的。在整个ChannelPipeline执行过程中,可能会发生线程切换。此时,如果同一个对象在多个ChannelHandler中被共享,可能会被多线程并发操作@ChannelHandler.Shareable public class StatelessHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) {logger.debug("Now client from " + ctx.channel().remoteAddress().toString());} }public class MyInitializer extends ChannelInitializer {private static final ChannelHandler INSTANCE = new StatelessHandler();@Overridepublic void initChannel(Channel ch) {ch.pipeline().addLast(INSTANCE);} }Annotate ChannelHandler that are stateless with @ChannelHandler.Shareable and use the same instance accross Channels to reduce GC. 移除不再需要的handler,以减少遍历带来的开销 Remove ChannelHandler once not needed anymorepublic class OneTimeHandler extends ChannelInboundHandlerAdapter {@Overridepublic void channelActive(ChannelHandlerContext ctx) {doOneTimeAction();ctx.channel().pipeline().remove(this); } }Remove ChannelHandler once not needed anymore. This keeps the ChannelPipeline as short as possible and so eliminate overhead of traversing as much as possible. 当通道就绪且有数据时,是否自动从通道读取数据 To auto-read or not to auto-read By default Netty will keep on reading data from the Channel once something is ready.Need more fine grained control ?channel.config().setAutoRead(false); channel.read(); channel.config().setAutoRead(true); Disable auto read == no more data will be read automatically of this Channel. Tell the Channel to do one read operation once new data is ready Enable again auto read == Netty will automatically read again This can also be quite useful when writing proxy like applications! 比NIO更快的传输 性能上从低到高如下:1) OioSocketChannel:传统,阻塞式编程。 2) NioSocketChannel:select/poll或者epoll,jdk 7之后linux下会自动选择epoll。 3) EpollSocketChannel:epoll,仅限linux,提供更多额外选项。 4) EpollDomainSocketChannel:ipc模式,仅限客户端、服务端在相同主机的情况,从4.0.26版本开始支持,见https://github.com/netty/netty/pull/3344。Switching to native transport is easyUsing NIO transportBootstrap bootstrap = new Bootstrap().group(new NioEventLoopGroup()); bootstrap.channel(NioSocketChannel.class);Using native transportBootstrap bootstrap = new Bootstrap().group(new EpollEventLoopGroup()); bootstrap.channel(EpollSocketChannel.class); 更好的解码器 ReplayingDecoder extends ByteToMessageDecoder 不再需要判断是否有可读字节,比如下面这样public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder {@Overrideprotected void decode(ChannelHandlerContext ctx,ByteBuf buf, List out) throws Exception {if (buf.readableBytes() buf.resetReaderIndex();return;}out.add(buf.readBytes(length));}}


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有